package main

import (
	"context"
	"fmt"
	"github.com/hibiken/asynq"
	"github.com/redis/go-redis/v9"
	"github.com/spf13/pflag"
	"log"
	"math/rand"
	"time"
)

func main() {
	nodeId := pflag.StringP("node-id", "n", "1", "node id")
	enableWorker := pflag.BoolP("worker", "w", false, "enable work")
	pflag.Parse()

	log.Printf("node id: %s\n", *nodeId)
	log.Printf("enable worker: %v\n", *enableWorker)
	// Example of using America/Los_Angeles timezone instead of the default UTC timezone.
	loc, err := time.LoadLocation("Asia/Shanghai")
	if err != nil {
		panic(err)
	}
	redisConnOpt := asynq.RedisClientOpt{
		Addr: "localhost:6379",
	}
	ctx, cancel := context.WithCancel(context.Background())
	defer cancel()
	leaderLock := NewLeaderLock(ctx, "localhost:6379")
	scheduler := asynq.NewScheduler(
		redisConnOpt,
		&asynq.SchedulerOpts{
			Location: loc,
			PreEnqueueFunc: func(task *asynq.Task, opts []asynq.Option) {
				if !leaderLock.IsLeader() {
					panic("not leader")
				}
			},
		},
	)

	task := asynq.NewTask("example_task", []byte(*nodeId))

	// You can use "@every <duration>" to specify the interval.
	entryID, err := scheduler.Register("@every 10s", task)
	if err != nil {
		log.Fatal(err)
	}
	log.Printf("registered an entry: %q\n", entryID)

	if *enableWorker {
		go workerMain()
	}
	if err := scheduler.Run(); err != nil {
		log.Fatal(err)
	}
}

func workerMain() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: "localhost:6379"},
		asynq.Config{
			// Specify how many concurrent workers to use
			Concurrency: 10,
			// Optionally specify multiple queues with different priority.
			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},
			// See the godoc for other configuration options
		},
	)

	// mux maps a type to a handler
	mux := asynq.NewServeMux()
	mux.HandleFunc("example_task", func(ctx context.Context, task *asynq.Task) error {
		log.Printf("payload = %v\n", string(task.Payload()))
		return nil
	})

	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

func NewLeaderLock(ctx context.Context, redisAddr string) *LeaderLock {
	redis := redis.NewClient(&redis.Options{
		Addr: redisAddr,
	})
	//hostname, err := os.Hostname()
	rand.New(rand.NewSource(time.Now().UnixNano()))
	rand.Uint64()
	hostname := fmt.Sprintf("host-%d", rand.Uint64())
	log.Printf("hostname: %s\n", hostname)
	return &LeaderLock{redis: redis, host: hostname, key: "asynq_leader"}
}

type LeaderLock struct {
	host  string
	key   string
	redis *redis.Client
	ctx   context.Context
}

func (l *LeaderLock) acquireLock() {
	if l.redis.Exists(context.Background(), l.key).Val() == 0 {
		l.redis.SetNX(context.Background(), l.key, l.host, time.Minute)
	}
}

func (l *LeaderLock) resign() {
	if l.IsLeader() {
		l.redis.Del(context.Background(), l.key).Err()
	}
}

func (l *LeaderLock) extendLock(key string, ttl time.Duration) {
	l.redis.Expire(context.Background(), key, ttl)
}

func (l *LeaderLock) IsLeader() bool {
	val, err := l.redis.Get(context.Background(), l.key).Result()
	if err != nil {
		return false
	}
	return val == l.host
}

func (l *LeaderLock) run() {
	t := time.NewTicker(30 * time.Second)
	for {
		select {
		case <-t.C:
			if !l.IsLeader() {
				l.acquireLock()
				continue
			} else {
				l.extendLock(l.key, time.Minute)
			}
		case <-l.ctx.Done():
			return
		}
	}
}
